Fork me on GitHub

sparksql DataSet算子实战

基本环境

scala版本:scala2.12.8
jdk版本:1.8
spark版本:2.4.1

准备数据

sku_sale_amount.csv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
10001,36B0,烟机   ,2015,100
10001,27A3,烟机 ,2016,100
10001,27A3,烟机 ,2017,200
10002,36B0,烟机 ,2015,100
10002,36B0,烟机 ,2016,100
10002,36B0,烟机 ,2017,200
10002,58B5,灶具 ,2014,200
10002,58B5,灶具 ,2015,200
10002,58B5,灶具 ,2016,200
10002,58B5,灶具 ,2017,200
10003,64B8,洗碗机 ,2014,200
10003,727T,智能消毒柜,2014,200
10004,64B8,净水器 ,2014,150
10004,64B8,净水器 ,2015,50
10004,64B8,净水器 ,2016,50
10004,45A8,净水器 ,2017,50
10004,64B8,嵌入式蒸箱,2014,150
10004,64B8,嵌入式蒸箱,2015,50
10004,64B8,嵌入式蒸箱,2016,50
10004,45A8,嵌入式蒸箱,2017,50

sku_product.csv

1
2
3
4
5
6
7
8
9
10
11
12
13
10001,1000
10001,2000
10001,4000
10001,6000
10002,8000
10002,10000
10002,9000
10002,6000
10003,5000
10004,4000
10004,3000
10004,44000
10004,11000

准备基本环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
object DataSetSingleOperating {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("DateFrameFromJsonScala")
.config("spark.some.config.option", "some-value")
.getOrCreate()

import org.apache.spark.sql.functions._
import spark.implicits._

val skuIncomeDF = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "false")
.load("src/main/resources/data/sku_sale_amount.csv")
.toDF("goods_id", "sku", "category", "year", "amount")

val skuProductDF = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "false")
.load("src/main/resources/data/sku_product.csv")
.toDF("goods_id1", "volume")

skuIncomeDF.createTempView("sku_sale_amount")
skuIncomeDF.createTempView("sku_product")
}
}

select 操作

1
2
3
4
5
6
7
8
skuIncomeDF.select($"goods_id", $"sku", $"category", $"year".as("date"), $"amount")
skuIncomeDF.select('goods_id, 'sku, 'category, 'year.as("date"), 'amount)
skuIncomeDF.select(col("goods_id"), col("sku"), col("category"), col("year").as("date"), 'amount)
skuIncomeDF.select(expr("goods_id"), expr("sku"), expr("category"), expr("year as date"), expr("amount+1"))
spark.sql(
"""
|select sku,category,year,amount from sku_sale_amount
""".stripMargin)

selectExpr操作

1
skuIncomeDF.selectExpr("goods_id", "sku", "category", "year as date", "amount+1 as amount")

filter 操作

1
2
3
4
5
skuIncomeDF.filter($"amount" > 150)
skuIncomeDF.filter(col("amount") > 150)
skuIncomeDF.filter('amount > 150)
skuIncomeDF.filter("amount > 150")
skuIncomeDF.filter(row => row.getInt(3) > 150)

where操作

1
2
3
4
5
6
7
8
skuIncomeDF.where($"amount" > 150)
skuIncomeDF.where(col("amount") > 150)
skuIncomeDF.where('amount > 150)
skuIncomeDF.where("amount > 150")
spark.sql(
"""
|select sku,category,year,amount from sku_sale_amount where amount > 150
""".stripMargin)

union 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
skuIncomeDF.union(skuIncomeDF).groupBy("sku").count()
spark.sql(
"""
|select sku,count(1) from (
|select sku,category,year,amount from sku_sale_amount
| union all
|select sku,category,year,amount from sku_sale_amount
|)a group by sku
""".stripMargin)

skuIncomeDF.union(skuIncomeDF).distinct().groupBy("sku").count()
spark.sql(
"""
|select sku,count(1) from (
|select sku,category,year,amount from sku_sale_amount
| union
|select sku,category,year,amount from sku_sale_amount
|)a group by sku
""".stripMargin)

group by操作

1
2
3
4
skuIncomeDF.groupBy("sku").count()
skuIncomeDF.groupBy($"sku").count()
skuIncomeDF.groupBy('sku).count()
skuIncomeDF.groupBy(col("sku")).count()

join操作

1
2
3
4
5
6
7
8
skuIncomeDF.join(skuIncomeDF, "sku")
skuIncomeDF.join(skuIncomeDF, Seq("sku", "category"))
skuIncomeDF.join(skuIncomeDF, Seq("sku"), "inner")

skuIncomeDF.join(skuProductDF, $"goods_id" === $"goods_id1")
skuIncomeDF.join(skuProductDF, col("goods_id") === col("goods_id1"))
skuIncomeDF.join(skuProductDF, col("goods_id").equalTo(col("goods_id1")))
skuIncomeDF.joinWith(skuProductDF, skuIncomeDF("goods_id") === skuProductDF("goods_id1"), "inner")

order by

底层用的还是sort。

1
2
3
skuIncomeDF.orderBy("sku", "category", "amount")
skuIncomeDF.orderBy($"sku", $"category", $"amount".desc)
skuIncomeDF.orderBy(col("sku"), col("category"), col("amount").desc)

sort 操作

1
2
3
skuIncomeDF.sort("sku", "category", "amount")
skuIncomeDF.sort($"sku", $"category", $"amount".desc)
skuIncomeDF.sort(col("sku"), col("category"), col("amount").desc)

sortwithinpartition操作

分区内部进行排序,局部排序。

1
2
3
skuIncomeDF.sortWithinPartitions("sku", "category")
skuIncomeDF.sortWithinPartitions($"sku", $"category", $"amount".desc)
skuIncomeDF.sortWithinPartitions(col("sku"), col("category").desc)

withColumn 操作

作用:假如列,存在就替换,不存在新增;对已有的列进行重命名。

1
2
3
skuIncomeDF.withColumn("amount", $"amount" + 1)
skuIncomeDF.withColumn("amount", 'amount + 1)
skuIncomeDF.withColumnRenamed("amount", "amount1")

foreach操作

1
skuIncomeDF.foreach(row => println(row.get(0)))

foreachPartition操作

1
2
3
skuIncomeDF.foreachPartition((it: Iterator[Row]) => {
it.foreach(row => println(row.get(0)))
})

distinct操作

1
skuIncomeDF.distinct()

dropDuplicates操作

1
skuIncomeDF.dropDuplicates("sku")

drop操作

删除一列,或者多列。

1
skuIncomeDF.drop("sku", "category")

cube操作

说明:相当于(category,year),(year),(category),() 分别分组然后对amount求sum
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-multi-dimensional-aggregation.html

1
2
3
skuIncomeDF.cube($"category", $"year".cast("string").as("year"))
.agg(sum("amount"))
.sort(col("category").desc_nulls_first, col("year").desc_nulls_first)

rollup操作

说明:等价于分别对(category,year),(year),()进行 groupby 对amount求sum。
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-multi-dimensional-aggregation.html

1
2
3
skuIncomeDF.rollup($"category", $"year".cast("string").as("year"))
.agg(sum("amount") as "amount", grouping_id() as "gid")
.sort($"category".desc_nulls_last, $"year".asc_nulls_last)

pivot操作

1
2
3
skuIncomeDF.groupBy("category")
.pivot("year", Seq("2014", "2015", "2016", "2017"))
.agg(sum("amount"))

转置操作

文章来自:http://bailiwick.io/2017/10/21/transpose-data-with-spark/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Import the requisite methods
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

// Create a dataframe
val df = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 7, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 8, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", "col6")

df.show(10,false)

// Create the transpose user defined function.
// Imputs:
// transDF: The dataframe which will be transposed
// transBy: The column that the dataframe will be transposed by
// Outputs:
// Dataframe datatype consisting of three columns:
// transBy
// column_name
// column_value
def transposeUDF(transDF: DataFrame, transBy: Seq[String]): DataFrame = {
val (cols, types) = transDF.dtypes.filter{ case (c, _) => !transBy.contains(c)}.unzip
require(types.distinct.size == 1)

val kvs = explode(array(
cols.map(c => struct(lit(c).alias("column_name"), col(c).alias("column_value"))): _*
))
val byExprs = transBy.map(col(_))

transDF
.select(byExprs :+ kvs.alias("_kvs"): _*)
.select(byExprs ++ Seq($"_kvs.column_name", $"_kvs.column_value"): _*)
}
transposeUDF(df, Seq("uid")).show(12,false)
Output:
df.show(10,false)
+---+----+----+----+----+----+----+
|uid|col1|col2|col3|col4|col5|col6|
+---+----+----+----+----+----+----+
|1 |1 |2 |3 |8 |4 |5 |
|2 |4 |3 |8 |7 |9 |8 |
|3 |6 |1 |9 |2 |3 |6 |
|4 |7 |8 |6 |9 |4 |5 |
|5 |9 |2 |7 |8 |7 |3 |
|6 |1 |1 |4 |2 |8 |4 |
+---+----+----+----+----+----+----+


transposeUDF(df, Seq("uid")).show(12,false)
+---+-----------+------------+
|uid|column_name|column_value|
+---+-----------+------------+
|1 |col1 |1 |
|1 |col2 |2 |
|1 |col3 |3 |
|1 |col4 |8 |
|1 |col5 |4 |
|1 |col6 |5 |
|2 |col1 |4 |
|2 |col2 |3 |
|2 |col3 |8 |
|2 |col4 |7 |
|2 |col5 |9 |
|2 |col6 |8 |
+---+-----------+------------+
only showing top 12 rows

参考文章

https://legacy.gitbook.com/book/jaceklaskowski/mastering-spark-sql/details
https://www.toutiao.com/i6631318012546793992/

本文标题:sparksql DataSet算子实战

文章作者:tang

发布时间:2019年04月19日 - 17:04

最后更新:2019年04月19日 - 17:04

原始链接:https://tgluon.github.io/2019/04/19/sparksql DataSet算子实战/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

-------------本文结束感谢您的阅读-------------